package com.atguigu.common.utils;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * Created by Smexy on 2022/6/25
 *
 *  提供一个生产者，生产数据到topic
 */
public class KafkaClientUtil {

    //声明生产者。 静态属性在类加载时，创建，只会创建一次。 单例
    private static Producer<String,String> producer = getProducer();

    //构造生产者，参数参考 ProducerConfig
    private static Producer<String,String> getProducer(){

        Properties properties = new Properties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,PropertiesUtil.getProperty("kafka.broker.list"));
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,PropertiesUtil.getProperty("key.serializer"));
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,PropertiesUtil.getProperty("value.serializer"));
        //生产幂等
        properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");
        properties.put(ProducerConfig.RETRIES_CONFIG,"3");

        return new KafkaProducer<String, String>(properties);

    }

    //提供public方法供别人使用
    public static  void sendDataToKafka(String topic,String value){

        producer.send(new ProducerRecord<>(topic,value));

    }

    //提供清空缓冲区方法
    public static  void flush(){
        producer.flush();
    }
}
